---
title: "How to create audit logs with Sequin"
sidebarTitle: "Create audit logs"
description: "Build comprehensive audit logging systems with Postgres change data capture (CDC)."
---

Audit logs are an essential tool for tracking and recording changes in your database. With Sequin, you can create comprehensive audit logs of changes in your Postgres database to:

- **Track compliance and security**: Meet regulatory requirements by monitoring sensitive data modifications
- **Debug and recover**: Trace change history to investigate issues and recover from unwanted changes
- **Build user features**: Create activity feeds, change history views, and undo/redo functionality
- **Capture context**: Use [transaction annotations](/reference/annotations) to record who made changes and why

## Prerequisites

If you're self-hosting Sequin, you'll need:

1. [Sequin installed](/running-sequin)
2. [A database connected](/connect-postgres)
3. A destination database ready to receive audit logs
4. A sink destination (like SQS, Kafka, Redis, or HTTP)



<Note>
  If using SQS, be sure to use a FIFO queue.
</Note>

## Architecture overview

Your audit logging pipeline will have these components:

1. **Source tables**: The table or schema in Postgres that you want to audit
2. **Destination sink**: The message queue or webhook endpoint that delivers changes to your processing system (e.g. SQS, Kafka, or HTTP endpoint)
3. **Processor**: An application or service you write that receives changes and writes to your audit tables
4. **Transaction annotations**: Metadata added to your transactions to provide context (who, why, where)

## Create a sink

First, create a sink to the queue, stream, or webhook endpoint that you want to use to process changes:

<Steps>
  <Step title="Select the source">
    Select the table or schema you want to audit.

    Optionally add SQL filters to audit a subset of your source table(s).
  </Step>

  <Step title="Select the message type">
    Leave the default "Changes" message type selected.
  </Step>

  <Step title="Leave message grouping default">
    If your sink supports message grouping, leave the default option selected for "Message grouping".

    This will ensure that messages are [grouped by primary key](/reference/sinks/overview#message-grouping-and-ordering), helping eliminate race conditions as you write audit logs.
  </Step>

  <Step title="Specify backfill">
    If you want to snapshot your current rows from your source table into your audit logs, specify a backfill.

    <Note>
      Backfill [messages](/reference/messages) have an `action` of `read`.
    </Note>
  </Step>

  <Step title="Configure sink-specific settings">
    Configure sink-specific settings and click "Create Sink".
  </Step>
</Steps>

## Annotate your transactions

To make your audit logs more valuable, use [transaction annotations](/reference/annotations) to add context to your changes. For example, you can record who made the change or from where.

<Steps>
  <Step title="Update your application code">
    Modify your database transaction code to include annotations:

    ```sql
    BEGIN;
      -- Set annotations for this transaction
      SELECT pg_logical_emit_message(true, 'sequin:transaction_annotations.set', '{
        "username": "paul.atreides",
        "source": "web_ui",
        "request_id": "req_123",
        "metadata": {
          "reason": "Customer requested update",
          "client_ip": "192.168.1.100"
        }
      }');

      -- Your database operations
      UPDATE users SET email = 'new@example.com' WHERE id = 'user_123';

    COMMIT;
    ```

    These annotations will be included in the change messages for all operations following the annotation statement in this transaction.
  </Step>
</Steps>

## Process changes

Once your sink is configured, changes from your source table will flow to your message queue or HTTP endpoint. Before implementing your audit processor, consider these key requirements for reliable audit logging:

### Important considerations

1. **Idempotency**: Implement idempotent processing to handle edge cases safely

 Duplicates are rare and only occur if your processor successfully writes to the database but fails to acknowledge messages from the queue (SQS/Kafka) or fails to return a 200 status code (HTTP endpoints). In these cases, the message will be redelivered to ensure at-least-once delivery.

 To protect against duplicates, each message includes an `idempotency_key` which is intended for use by your application to reject any [possible duplicate messages](/reference/sinks/overview#guaranteed-delivery).

 The idempotency key is available in the message metadata as `metadata.idempotency_key`. For example, in a change message it would look like:
 ```json
 {
   "metadata": {
     "idempotency_key": "<opaque string>",
     ...
   }
 }
 ```

2. **Type handling**: Cast JSON to Postgres types

    Sequin sends events to your consumer in JSON. Since JSON's types are not as rich as Postgres' types, you'll need to cast values appropriately when writing to your database.

    Common conversions include:
    - Timestamps/dates: Cast from strings to `timestamp` or `date`
    - UUIDs: Cast from strings to `uuid`
    - Numeric types: Cast to `decimal`, `bigint`, etc. based on precision needs

3. **Batch processing**: For better performance, batch your database operations.

### Example: Enhanced audit table with annotations

First, create an audit table to store your change history, including fields for transaction annotations:

```sql create_table.sql
create table audit_logs (
  id serial primary key,
  table_name text not null,
  record_id uuid not null,
  commit_lsn bigint not null,
  action text not null,
  old_values jsonb,
  new_values jsonb,
  created_at timestamp not null default now(),
  updated_at timestamp,
  -- Fields for transaction annotations
  username text,
  source text,
  request_id text,
  metadata jsonb
);

create unique index on audit_logs(commit_lsn, record_id);

-- Optional: Add indexes for common queries
create index on audit_logs(table_name, record_id);
create index on audit_logs(created_at);
create index on audit_logs(username);
create index on audit_logs(request_id);
```

### Process changes with annotations

Map changes and their annotations to your audit log table and perform an upsert operation:

```python process_change.py
def process_change(change):
    # Extract transaction annotations
    annotations = change.metadata.get('transaction_annotations', {})

    record = {
        'record_id': uuid.UUID(change.record['id']),
        'commit_lsn': change.metadata.commit_lsn,
        'table_name': change.metadata.table_name,
        'action': change.action,
        'old_values': json.dumps(change.changes) if change.changes else None,
        'new_values': json.dumps(change.record),
        'created_at': datetime.now(),
        'updated_at': datetime.now(),
        # Transaction annotation fields
        'username': annotations.get('username'),
        'source': annotations.get('source'),
        'request_id': annotations.get('request_id'),
        'metadata': json.dumps(annotations.get('metadata', {}))
    }

    db.execute("""
        insert into audit_logs (
            record_id, commit_lsn, table_name,
            action, old_values, new_values,
            created_at, updated_at,
            username, source, request_id, metadata
        )
        values (
            %(record_id)s, %(commit_lsn)s, %(table_name)s,
            %(action)s, %(old_values)s, %(new_values)s,
            %(created_at)s, %(updated_at)s,
            %(username)s, %(source)s, %(request_id)s, %(metadata)s
        )
        on conflict (record_id, commit_lsn) do update set
            table_name = excluded.table_name,
            action = excluded.action,
            old_values = excluded.old_values,
            new_values = excluded.new_values,
            updated_at = excluded.updated_at,
            username = excluded.username,
            source = excluded.source,
            request_id = excluded.request_id,
            metadata = excluded.metadata
    """, record)
```

<Note>
  For better performance, consider batching multiple changes into a single database operation. Batching increases throughput while still maintaining transactional guarantees.
</Note>

Your audit log table will now be populated with old and new values for each change, along with rich contextual information about who made the change and why.

### Example: Enhanced Activity feed with annotations

With transaction annotations, you can create more informative activity feeds that show not just what changed, but who changed it and why:

```sql
create table activity_feed (
    id serial primary key,
    record_id uuid not null,
    commit_lsn bigint not null,
    action text not null,
    description text not null,
    user_id uuid not null,
    actor text not null,
    source text not null,
    request_id text,
    reason text,
    metadata jsonb,
    created_at timestamp not null default now()
);

create unique index on activity_feed(commit_lsn, record_id);
create index on activity_feed(user_id, created_at);
create index on activity_feed(actor, created_at);
```

Process the annotated changes:

```python process_change.py
def process_change(change):
    # Extract transaction annotations
    annotations = change.metadata.get('transaction_annotations', {})

    description = generate_activity_description(
        change.metadata.table_name,
        change.action,
        change.record,
        annotations
    )

    activity = {
        'record_id': uuid.UUID(change.record['id']),
        'commit_lsn': change.metadata.commit_lsn,
        'action': change.action,
        'description': description,
        'user_id': uuid.UUID(change.record['user_id']),
        'actor': annotations.get('username', 'system'),
        'source': annotations.get('source', 'unknown'),
        'request_id': annotations.get('request_id'),
        'reason': annotations.get('metadata', {}).get('reason'),
        'metadata': json.dumps(change.record),
        'created_at': datetime.now()
    }

    db.execute("""
        insert into activity_feed (
            record_id, commit_lsn, action,
            description, actor, source,
            request_id, reason, metadata, created_at
        )
        values (
            %(record_id)s, %(commit_lsn)s, %(action)s,
            %(description)s, %(actor)s, %(source)s,
            %(request_id)s, %(reason)s, %(metadata)s, %(created_at)s
        )
        on conflict (record_id, commit_lsn) do update set
            action = excluded.action,
            description = excluded.description,
            actor = excluded.actor,
            source = excluded.source,
            request_id = excluded.request_id,
            reason = excluded.reason,
            metadata = excluded.metadata
    """, activity)

def generate_activity_description(table, action, record, annotations):
    actor = annotations.get('username', 'System')

    if table == 'orders':
        if action == 'insert':
            return f"{actor} created order #{record['order_number']}"
        elif action == 'update':
            return f"{actor} updated order #{record['order_number']}"
    # ... etc
```

## Compliance reporting

For regulated industries, you can use annotations to simplify compliance reporting:

```python
def process_compliance_event(change):
    annotations = change.metadata.get('transaction_annotations', {})

    # Track data access for GDPR/HIPAA
    compliance_event = {
        'record_id': uuid.UUID(change.record['id']),
        'commit_lsn': change.metadata.commit_lsn,
        'timestamp': datetime.now(),
        'data_category': determine_data_category(change.metadata.table_name, change.record),
        'operation_type': change.action,
        'user_id': annotations.get('username'),
        'purpose': annotations.get('metadata', {}).get('reason'),
        'lawful_basis': annotations.get('metadata', {}).get('lawful_basis'),
        'system_source': annotations.get('source')
    }

    # Insert into compliance log
    db.execute("""
        insert into compliance_log (
            record_id, commit_lsn, timestamp, data_category, operation_type,
            user_id, purpose, lawful_basis, system_source
        ) values (
            %(record_id)s, %(commit_lsn)s, %(timestamp)s, %(data_category)s, %(operation_type)s,
            %(user_id)s, %(purpose)s, %(lawful_basis)s, %(system_source)s
        )
    """, compliance_event)
```

## Verify your pipeline is working

If you specified a backfill, there should be messages in your sink ready for your system to process:

1. On the sink overview page, click the "Messages" tab. You should see messages flowing to your sink.
2. Check your audit tables to ensure changes are being recorded as expected.
3. Verify that transaction annotations are being correctly captured in your audit logs.

## Maintenance

### Re-syncing your audit logs

You may need to re-sync your audit logs in these scenarios:

1. **Schema changes**: Updates to source or audit table schema
2. **Logic updates**: Changes to audit transformation logic
3. **Data recovery**: Recovering from processing errors

When streaming `changes` without [retention](/reference/change-retention), you can [backfill](/reference/backfills) from the source table. The change messages will be of action `read` and will only include the value currently in the database. Old values and deleted rows are not included.

<Note>
  Note that backfilled records will not include transaction annotations.
</Note>

## Next steps

See "[Deploy to production](/how-to/deploy-to-production)" for guidance on copying your local sink configuration to your production environment.

<CardGroup>
  <Card title="Annotations" icon="pencil" href="/reference/annotations">
    Learn more about how to use transaction annotations.
  </Card>
  <Card title="How to annotate changes" icon="pencil" href="/how-to/annotate-changes">
    Step-by-step guide on adding annotations to your database changes.
  </Card>
</CardGroup>
